Skip to content

feat: add hybrid memory backend and mysql rds support#42

Open
ashione wants to merge 1 commit intomainfrom
codex/extend-memory-capabilities-to-redis-and-rds-v5alyq
Open

feat: add hybrid memory backend and mysql rds support#42
ashione wants to merge 1 commit intomainfrom
codex/extend-memory-capabilities-to-redis-and-rds-v5alyq

Conversation

@ashione
Copy link
Copy Markdown
Owner

@ashione ashione commented Aug 24, 2025

Summary

  • combine Redis cache with RDS persistence via HybridMemory
  • register hybrid backend in factory and expose default config
  • allow RDS memory to connect to MySQL via db_url
  • migrate RDS backend to SQLAlchemy to avoid hand-written SQL
  • document optional SQL dependencies for RDS memory in project READMEs
  • declare redis and SQL extras in dependency configuration
  • format memory modules and tests to satisfy lint

Testing

  • CI=true bash scripts/precommit.sh
  • PYTHONPATH=$PWD pytest

https://chatgpt.com/codex/tasks/task_e_68aa685e00e48322ae6eb23907e304ab

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces hybrid memory support by combining Redis caching with RDS persistence, along with standalone Redis and RDS memory backends.

  • Adds three new memory backend implementations: RedisMemory, RDSMemory, and HybridMemory
  • Migrates RDS backend to SQLAlchemy with MySQL support via db_url configuration
  • Updates dependency configuration to include optional SQL and Redis dependencies

Reviewed Changes

Copilot reviewed 14 out of 15 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
vertex_flow/tests/test_memory_redis.py Test suite for RedisMemory with mock Redis client
vertex_flow/tests/test_memory_rds.py Test suite for RDSMemory with SQLite in-memory database
vertex_flow/tests/test_memory_hybrid.py Test suite for HybridMemory combining Redis and RDS backends
vertex_flow/tests/test_memory_factory.py Updated factory tests to include new memory backends
vertex_flow/memory/redis_store.py Redis-based memory implementation with JSON serialization
vertex_flow/memory/rds_store.py SQLAlchemy-based RDS memory with SQLite/MySQL support
vertex_flow/memory/hybrid_store.py Hybrid implementation using Redis for cache and RDS for persistence
vertex_flow/memory/factory.py Factory registration for new memory backends with default configs
vertex_flow/memory/init.py Updated module exports to include new memory classes
setup.py Added memory extra dependencies for Redis and SQL support
pyproject.toml Added memory extra dependencies configuration
README_ZH.md Updated Chinese documentation for optional SQL dependencies
README_EN.md Updated English documentation for optional SQL dependencies
README.md Updated main documentation for optional SQL dependencies

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +6 to +96


class DummyPipeline:
def __init__(self, client):
self._client = client
self._commands = []

def lpush(self, *args):
self._commands.append(("lpush", args))
return self

def ltrim(self, *args):
self._commands.append(("ltrim", args))
return self

def incr(self, *args):
self._commands.append(("incr", args))
return self

def expire(self, *args):
self._commands.append(("expire", args))
return self

def execute(self):
results = []
for cmd, args in self._commands:
results.append(getattr(self._client, cmd)(*args))
self._commands.clear()
return results


class DummyRedis:
def __init__(self):
self._store = {}
self._lists = {}

def _check_expired(self, key):
if key in self._store:
value, exp = self._store[key]
if exp is not None and time.time() > exp:
del self._store[key]

def set(self, key, value, nx=False, ex=None):
self._check_expired(key)
if nx and key in self._store:
return None
expires_at = time.time() + ex if ex else None
self._store[key] = (value, expires_at)
return True

def get(self, key):
self._check_expired(key)
if key not in self._store:
return None
return self._store[key][0]

def delete(self, key):
self._store.pop(key, None)

def lpush(self, key, value):
self._lists.setdefault(key, [])
self._lists[key].insert(0, value)

def ltrim(self, key, start, end):
self._lists.setdefault(key, [])
self._lists[key] = self._lists[key][start : end + 1]

def lrange(self, key, start, end):
lst = self._lists.get(key, [])
if end == -1:
end = len(lst) - 1
return lst[start : end + 1]

def pipeline(self):
return DummyPipeline(self)

def incr(self, key):
self._check_expired(key)
value = int(self._store.get(key, ("0", None))[0]) + 1
_, exp = self._store.get(key, (None, None))
self._store[key] = (str(value), exp)
return value

def expire(self, key, ttl):
if key in self._store:
value, _ = self._store[key]
self._store[key] = (value, time.time() + ttl)
return True
return False


Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DummyPipeline and DummyRedis classes are duplicated across multiple test files. Consider extracting these into a shared test utility module to reduce code duplication and improve maintainability.

Suggested change
class DummyPipeline:
def __init__(self, client):
self._client = client
self._commands = []
def lpush(self, *args):
self._commands.append(("lpush", args))
return self
def ltrim(self, *args):
self._commands.append(("ltrim", args))
return self
def incr(self, *args):
self._commands.append(("incr", args))
return self
def expire(self, *args):
self._commands.append(("expire", args))
return self
def execute(self):
results = []
for cmd, args in self._commands:
results.append(getattr(self._client, cmd)(*args))
self._commands.clear()
return results
class DummyRedis:
def __init__(self):
self._store = {}
self._lists = {}
def _check_expired(self, key):
if key in self._store:
value, exp = self._store[key]
if exp is not None and time.time() > exp:
del self._store[key]
def set(self, key, value, nx=False, ex=None):
self._check_expired(key)
if nx and key in self._store:
return None
expires_at = time.time() + ex if ex else None
self._store[key] = (value, expires_at)
return True
def get(self, key):
self._check_expired(key)
if key not in self._store:
return None
return self._store[key][0]
def delete(self, key):
self._store.pop(key, None)
def lpush(self, key, value):
self._lists.setdefault(key, [])
self._lists[key].insert(0, value)
def ltrim(self, key, start, end):
self._lists.setdefault(key, [])
self._lists[key] = self._lists[key][start : end + 1]
def lrange(self, key, start, end):
lst = self._lists.get(key, [])
if end == -1:
end = len(lst) - 1
return lst[start : end + 1]
def pipeline(self):
return DummyPipeline(self)
def incr(self, key):
self._check_expired(key)
value = int(self._store.get(key, ("0", None))[0]) + 1
_, exp = self._store.get(key, (None, None))
self._store[key] = (str(value), exp)
return value
def expire(self, key, ttl):
if key in self._store:
value, _ = self._store[key]
self._store[key] = (value, time.time() + ttl)
return True
return False
from vertex_flow.tests.test_utils import DummyPipeline, DummyRedis

Copilot uses AI. Check for mistakes.
Comment on lines +6 to +96


class DummyPipeline:
def __init__(self, client):
self._client = client
self._commands = []

def lpush(self, *args):
self._commands.append(("lpush", args))
return self

def ltrim(self, *args):
self._commands.append(("ltrim", args))
return self

def incr(self, *args):
self._commands.append(("incr", args))
return self

def expire(self, *args):
self._commands.append(("expire", args))
return self

def execute(self):
results = []
for cmd, args in self._commands:
results.append(getattr(self._client, cmd)(*args))
self._commands.clear()
return results


class DummyRedis:
def __init__(self):
self._store = {}
self._lists = {}

def _check_expired(self, key):
if key in self._store:
value, exp = self._store[key]
if exp is not None and time.time() > exp:
del self._store[key]

def set(self, key, value, nx=False, ex=None):
self._check_expired(key)
if nx and key in self._store:
return None
expires_at = time.time() + ex if ex else None
self._store[key] = (value, expires_at)
return True

def get(self, key):
self._check_expired(key)
if key not in self._store:
return None
return self._store[key][0]

def delete(self, key):
self._store.pop(key, None)

def lpush(self, key, value):
self._lists.setdefault(key, [])
self._lists[key].insert(0, value)

def ltrim(self, key, start, end):
self._lists.setdefault(key, [])
self._lists[key] = self._lists[key][start : end + 1]

def lrange(self, key, start, end):
lst = self._lists.get(key, [])
if end == -1:
end = len(lst) - 1
return lst[start : end + 1]

def pipeline(self):
return DummyPipeline(self)

def incr(self, key):
self._check_expired(key)
value = int(self._store.get(key, ("0", None))[0]) + 1
_, exp = self._store.get(key, (None, None))
self._store[key] = (str(value), exp)
return value

def expire(self, key, ttl):
if key in self._store:
value, _ = self._store[key]
self._store[key] = (value, time.time() + ttl)
return True
return False


Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DummyPipeline and DummyRedis classes are duplicated across multiple test files. Consider extracting these into a shared test utility module to reduce code duplication and improve maintainability.

Suggested change
class DummyPipeline:
def __init__(self, client):
self._client = client
self._commands = []
def lpush(self, *args):
self._commands.append(("lpush", args))
return self
def ltrim(self, *args):
self._commands.append(("ltrim", args))
return self
def incr(self, *args):
self._commands.append(("incr", args))
return self
def expire(self, *args):
self._commands.append(("expire", args))
return self
def execute(self):
results = []
for cmd, args in self._commands:
results.append(getattr(self._client, cmd)(*args))
self._commands.clear()
return results
class DummyRedis:
def __init__(self):
self._store = {}
self._lists = {}
def _check_expired(self, key):
if key in self._store:
value, exp = self._store[key]
if exp is not None and time.time() > exp:
del self._store[key]
def set(self, key, value, nx=False, ex=None):
self._check_expired(key)
if nx and key in self._store:
return None
expires_at = time.time() + ex if ex else None
self._store[key] = (value, expires_at)
return True
def get(self, key):
self._check_expired(key)
if key not in self._store:
return None
return self._store[key][0]
def delete(self, key):
self._store.pop(key, None)
def lpush(self, key, value):
self._lists.setdefault(key, [])
self._lists[key].insert(0, value)
def ltrim(self, key, start, end):
self._lists.setdefault(key, [])
self._lists[key] = self._lists[key][start : end + 1]
def lrange(self, key, start, end):
lst = self._lists.get(key, [])
if end == -1:
end = len(lst) - 1
return lst[start : end + 1]
def pipeline(self):
return DummyPipeline(self)
def incr(self, key):
self._check_expired(key)
value = int(self._store.get(key, ("0", None))[0]) + 1
_, exp = self._store.get(key, (None, None))
self._store[key] = (str(value), exp)
return value
def expire(self, key, ttl):
if key in self._store:
value, _ = self._store[key]
self._store[key] = (value, time.time() + ttl)
return True
return False
from vertex_flow.tests.test_utils import DummyPipeline, DummyRedis

Copilot uses AI. Check for mistakes.
Comment on lines +10 to +100


class DummyPipeline:
def __init__(self, client):
self._client = client
self._commands = []

def lpush(self, *args):
self._commands.append(("lpush", args))
return self

def ltrim(self, *args):
self._commands.append(("ltrim", args))
return self

def incr(self, *args):
self._commands.append(("incr", args))
return self

def expire(self, *args):
self._commands.append(("expire", args))
return self

def execute(self):
results = []
for cmd, args in self._commands:
results.append(getattr(self._client, cmd)(*args))
self._commands.clear()
return results


class DummyRedis:
def __init__(self):
self._store = {}
self._lists = {}

def _check_expired(self, key):
if key in self._store:
value, exp = self._store[key]
if exp is not None and time.time() > exp:
del self._store[key]

def set(self, key, value, nx=False, ex=None):
self._check_expired(key)
if nx and key in self._store:
return None
expires_at = time.time() + ex if ex else None
self._store[key] = (value, expires_at)
return True

def get(self, key):
self._check_expired(key)
if key not in self._store:
return None
return self._store[key][0]

def delete(self, key):
self._store.pop(key, None)

def lpush(self, key, value):
self._lists.setdefault(key, [])
self._lists[key].insert(0, value)

def ltrim(self, key, start, end):
self._lists.setdefault(key, [])
self._lists[key] = self._lists[key][start : end + 1]

def lrange(self, key, start, end):
lst = self._lists.get(key, [])
if end == -1:
end = len(lst) - 1
return lst[start : end + 1]

def pipeline(self):
return DummyPipeline(self)

def incr(self, key):
self._check_expired(key)
value = int(self._store.get(key, ("0", None))[0]) + 1
_, exp = self._store.get(key, (None, None))
self._store[key] = (str(value), exp)
return value

def expire(self, key, ttl):
if key in self._store:
value, _ = self._store[key]
self._store[key] = (value, time.time() + ttl)
return True
return False


Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DummyPipeline and DummyRedis classes are duplicated from the Redis test file. Consider extracting these into a shared test utility module to reduce code duplication and improve maintainability.

Suggested change
class DummyPipeline:
def __init__(self, client):
self._client = client
self._commands = []
def lpush(self, *args):
self._commands.append(("lpush", args))
return self
def ltrim(self, *args):
self._commands.append(("ltrim", args))
return self
def incr(self, *args):
self._commands.append(("incr", args))
return self
def expire(self, *args):
self._commands.append(("expire", args))
return self
def execute(self):
results = []
for cmd, args in self._commands:
results.append(getattr(self._client, cmd)(*args))
self._commands.clear()
return results
class DummyRedis:
def __init__(self):
self._store = {}
self._lists = {}
def _check_expired(self, key):
if key in self._store:
value, exp = self._store[key]
if exp is not None and time.time() > exp:
del self._store[key]
def set(self, key, value, nx=False, ex=None):
self._check_expired(key)
if nx and key in self._store:
return None
expires_at = time.time() + ex if ex else None
self._store[key] = (value, expires_at)
return True
def get(self, key):
self._check_expired(key)
if key not in self._store:
return None
return self._store[key][0]
def delete(self, key):
self._store.pop(key, None)
def lpush(self, key, value):
self._lists.setdefault(key, [])
self._lists[key].insert(0, value)
def ltrim(self, key, start, end):
self._lists.setdefault(key, [])
self._lists[key] = self._lists[key][start : end + 1]
def lrange(self, key, start, end):
lst = self._lists.get(key, [])
if end == -1:
end = len(lst) - 1
return lst[start : end + 1]
def pipeline(self):
return DummyPipeline(self)
def incr(self, key):
self._check_expired(key)
value = int(self._store.get(key, ("0", None))[0]) + 1
_, exp = self._store.get(key, (None, None))
self._store[key] = (str(value), exp)
return value
def expire(self, key, ttl):
if key in self._store:
value, _ = self._store[key]
self._store[key] = (value, time.time() + ttl)
return True
return False
from vertex_flow.tests.test_utils import DummyPipeline, DummyRedis

Copilot uses AI. Check for mistakes.
Comment on lines +10 to +100


class DummyPipeline:
def __init__(self, client):
self._client = client
self._commands = []

def lpush(self, *args):
self._commands.append(("lpush", args))
return self

def ltrim(self, *args):
self._commands.append(("ltrim", args))
return self

def incr(self, *args):
self._commands.append(("incr", args))
return self

def expire(self, *args):
self._commands.append(("expire", args))
return self

def execute(self):
results = []
for cmd, args in self._commands:
results.append(getattr(self._client, cmd)(*args))
self._commands.clear()
return results


class DummyRedis:
def __init__(self):
self._store = {}
self._lists = {}

def _check_expired(self, key):
if key in self._store:
value, exp = self._store[key]
if exp is not None and time.time() > exp:
del self._store[key]

def set(self, key, value, nx=False, ex=None):
self._check_expired(key)
if nx and key in self._store:
return None
expires_at = time.time() + ex if ex else None
self._store[key] = (value, expires_at)
return True

def get(self, key):
self._check_expired(key)
if key not in self._store:
return None
return self._store[key][0]

def delete(self, key):
self._store.pop(key, None)

def lpush(self, key, value):
self._lists.setdefault(key, [])
self._lists[key].insert(0, value)

def ltrim(self, key, start, end):
self._lists.setdefault(key, [])
self._lists[key] = self._lists[key][start : end + 1]

def lrange(self, key, start, end):
lst = self._lists.get(key, [])
if end == -1:
end = len(lst) - 1
return lst[start : end + 1]

def pipeline(self):
return DummyPipeline(self)

def incr(self, key):
self._check_expired(key)
value = int(self._store.get(key, ("0", None))[0]) + 1
_, exp = self._store.get(key, (None, None))
self._store[key] = (str(value), exp)
return value

def expire(self, key, ttl):
if key in self._store:
value, _ = self._store[key]
self._store[key] = (value, time.time() + ttl)
return True
return False


Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DummyPipeline and DummyRedis classes are duplicated from the Redis test file. Consider extracting these into a shared test utility module to reduce code duplication and improve maintainability.

Suggested change
class DummyPipeline:
def __init__(self, client):
self._client = client
self._commands = []
def lpush(self, *args):
self._commands.append(("lpush", args))
return self
def ltrim(self, *args):
self._commands.append(("ltrim", args))
return self
def incr(self, *args):
self._commands.append(("incr", args))
return self
def expire(self, *args):
self._commands.append(("expire", args))
return self
def execute(self):
results = []
for cmd, args in self._commands:
results.append(getattr(self._client, cmd)(*args))
self._commands.clear()
return results
class DummyRedis:
def __init__(self):
self._store = {}
self._lists = {}
def _check_expired(self, key):
if key in self._store:
value, exp = self._store[key]
if exp is not None and time.time() > exp:
del self._store[key]
def set(self, key, value, nx=False, ex=None):
self._check_expired(key)
if nx and key in self._store:
return None
expires_at = time.time() + ex if ex else None
self._store[key] = (value, expires_at)
return True
def get(self, key):
self._check_expired(key)
if key not in self._store:
return None
return self._store[key][0]
def delete(self, key):
self._store.pop(key, None)
def lpush(self, key, value):
self._lists.setdefault(key, [])
self._lists[key].insert(0, value)
def ltrim(self, key, start, end):
self._lists.setdefault(key, [])
self._lists[key] = self._lists[key][start : end + 1]
def lrange(self, key, start, end):
lst = self._lists.get(key, [])
if end == -1:
end = len(lst) - 1
return lst[start : end + 1]
def pipeline(self):
return DummyPipeline(self)
def incr(self, key):
self._check_expired(key)
value = int(self._store.get(key, ("0", None))[0]) + 1
_, exp = self._store.get(key, (None, None))
self._store[key] = (str(value), exp)
return value
def expire(self, key, ttl):
if key in self._store:
value, _ = self._store[key]
self._store[key] = (value, time.time() + ttl)
return True
return False
from vertex_flow.tests.utils import DummyPipeline, DummyRedis

Copilot uses AI. Check for mistakes.
Comment on lines +43 to +45
try: # pragma: no cover - optional dependency
import pymysql # noqa: F401
except Exception as exc:
Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broad Exception catch should be more specific. Consider catching ImportError or ModuleNotFoundError instead, as these are the expected exceptions when a module is not installed.

Suggested change
try: # pragma: no cover - optional dependency
import pymysql # noqa: F401
except Exception as exc:
except ImportError as exc:

Copilot uses AI. Check for mistakes.
Comment on lines +8 to +10
try: # pragma: no cover - optional dependency
import redis
except Exception: # pragma: no cover
Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broad Exception catch should be more specific. Consider catching ImportError or ModuleNotFoundError instead, as these are the expected exceptions when a module is not installed.

Suggested change
try: # pragma: no cover - optional dependency
import redis
except Exception: # pragma: no cover
except ImportError: # pragma: no cover

Copilot uses AI. Check for mistakes.
Comment on lines +13 to +15
try: # pragma: no cover - optional dependency
import sqlalchemy as sa
except Exception: # pragma: no cover
Copy link

Copilot AI Aug 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broad Exception catch should be more specific. Consider catching ImportError or ModuleNotFoundError instead, as these are the expected exceptions when a module is not installed.

Suggested change
try: # pragma: no cover - optional dependency
import sqlalchemy as sa
except Exception: # pragma: no cover
except ImportError: # pragma: no cover

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants